// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "runtime/fragment_mgr.h"

#include <gperftools/profiler.h>
#include <thrift/protocol/TDebugProtocol.h>

#include <memory>
#include <sstream>

#include "agent/cgroups_mgr.h"
#include "common/object_pool.h"
#include "common/resource_tls.h"
#include "gen_cpp/DataSinks_types.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/QueryPlanExtra_types.h"
#include "gen_cpp/Types_types.h"
#include "gutil/strings/substitute.h"
#include "runtime/client_cache.h"
#include "runtime/datetime_value.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/stream_load_pipe.h"
#include "service/backend_options.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"
#include "util/threadpool.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
#include "util/url_coding.h"

namespace doris {

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(plan_fragment_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT);

std::string to_load_error_http_path(const std::string& file_name) {
    if (file_name.empty()) {
        return "";
    }
    std::stringstream url;
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
        << "/api/_load_error_log?"
        << "file=" << file_name;
    return url.str();
}

using apache::thrift::TException;
using apache::thrift::TProcessor;
using apache::thrift::transport::TTransportException;

class RuntimeProfile;
class FragmentExecState {
public:
    // Constructor by using QueryFragmentsCtx
    FragmentExecState(const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num,
                      ExecEnv* exec_env, std::shared_ptr<QueryFragmentsCtx> fragments_ctx);

    FragmentExecState(const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num,
                      ExecEnv* exec_env, const TNetworkAddress& coord_addr);

    ~FragmentExecState();

    Status prepare(const TExecPlanFragmentParams& params);

    // just no use now
    void callback(const Status& status, RuntimeProfile* profile, bool done);

    std::string to_http_path(const std::string& file_name);

    Status execute();

    Status cancel_before_execute();

    Status cancel(const PPlanFragmentCancelReason& reason);

    TUniqueId fragment_instance_id() const { return _fragment_instance_id; }

    TUniqueId query_id() const { return _query_id; }

    PlanFragmentExecutor* executor() { return &_executor; }

    const DateTimeValue& start_time() const { return _start_time; }

    void set_merge_controller_handler(
            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
        _merge_controller_handler = handler;
    }

    // Update status of this fragment execute
    Status update_status(Status status) {
        std::lock_guard<std::mutex> l(_status_lock);
        if (!status.ok() && _exec_status.ok()) {
            _exec_status = status;
        }
        return _exec_status;
    }

    void set_group(const TResourceInfo& info) {
        _set_rsc_info = true;
        _user = info.user;
        _group = info.group;
    }

    bool is_timeout(const DateTimeValue& now) const {
        if (_timeout_second <= 0) {
            return false;
        }
        if (now.second_diff(_start_time) > _timeout_second) {
            return true;
        }
        return false;
    }

    int get_timeout_second() const { return _timeout_second; }

    std::shared_ptr<QueryFragmentsCtx> get_fragments_ctx() { return _fragments_ctx; }

    void set_pipe(std::shared_ptr<StreamLoadPipe> pipe) { _pipe = pipe; }
    std::shared_ptr<StreamLoadPipe> get_pipe() const { return _pipe; }

private:
    void coordinator_callback(const Status& status, RuntimeProfile* profile, bool done);

    // Id of this query
    TUniqueId _query_id;
    // Id of this instance
    TUniqueId _fragment_instance_id;
    // Used to report to coordinator which backend is over
    int _backend_num;
    ExecEnv* _exec_env;
    TNetworkAddress _coord_addr;

    PlanFragmentExecutor _executor;
    DateTimeValue _start_time;

    std::mutex _status_lock;
    Status _exec_status;

    bool _set_rsc_info;
    std::string _user;
    std::string _group;

    int _timeout_second;

    std::unique_ptr<std::thread> _exec_thread;

    // This context is shared by all fragments of this host in a query
    std::shared_ptr<QueryFragmentsCtx> _fragments_ctx;

    std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
    // The pipe for data transfering, such as insert.
    std::shared_ptr<StreamLoadPipe> _pipe;
};

FragmentExecState::FragmentExecState(const TUniqueId& query_id,
                                     const TUniqueId& fragment_instance_id, int backend_num,
                                     ExecEnv* exec_env,
                                     std::shared_ptr<QueryFragmentsCtx> fragments_ctx)
        : _query_id(query_id),
          _fragment_instance_id(fragment_instance_id),
          _backend_num(backend_num),
          _exec_env(exec_env),
          _executor(exec_env, std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback),
                                              this, std::placeholders::_1, std::placeholders::_2,
                                              std::placeholders::_3)),
          _timeout_second(-1),
          _set_rsc_info(false),
          _fragments_ctx(std::move(fragments_ctx)) {
    _start_time = DateTimeValue::local_time();
    _coord_addr = _fragments_ctx->coord_addr;
}

FragmentExecState::FragmentExecState(const TUniqueId& query_id,
                                     const TUniqueId& fragment_instance_id, int backend_num,
                                     ExecEnv* exec_env, const TNetworkAddress& coord_addr)
        : _query_id(query_id),
          _fragment_instance_id(fragment_instance_id),
          _backend_num(backend_num),
          _exec_env(exec_env),
          _coord_addr(coord_addr),
          _executor(exec_env, std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback),
                                              this, std::placeholders::_1, std::placeholders::_2,
                                              std::placeholders::_3)),
          _timeout_second(-1) {
    _start_time = DateTimeValue::local_time();
}

FragmentExecState::~FragmentExecState() {}

Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) {
    if (params.__isset.query_options) {
        _timeout_second = params.query_options.query_timeout;
    }

    if (_fragments_ctx == nullptr) {
        if (params.__isset.resource_info) {
            set_group(params.resource_info);
        }
    }

    if (_fragments_ctx == nullptr) {
        return _executor.prepare(params);
    } else {
        return _executor.prepare(params, _fragments_ctx.get());
    }
}

Status FragmentExecState::execute() {
    int64_t duration_ns = 0;
    {
        SCOPED_RAW_TIMER(&duration_ns);
        CgroupsMgr::apply_system_cgroup();
        WARN_IF_ERROR(_executor.open(), strings::Substitute("Got error while opening fragment $0",
                                                            print_id(_fragment_instance_id)));
        _executor.close();
    }
    DorisMetrics::instance()->fragment_requests_total->increment(1);
    DorisMetrics::instance()->fragment_request_duration_us->increment(duration_ns / 1000);
    return Status::OK();
}

Status FragmentExecState::cancel_before_execute() {
    // set status as 'abort', cuz cancel() won't effect the status arg of DataSink::close().
    _executor.set_abort();
    _executor.cancel();
    if (_pipe != nullptr) {
        _pipe->cancel();
    }
    return Status::OK();
}

Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason) {
    std::lock_guard<std::mutex> l(_status_lock);
    RETURN_IF_ERROR(_exec_status);
    if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
        _executor.set_is_report_on_cancel(false);
    }
    _executor.cancel();
    if (_pipe != nullptr) {
        _pipe->cancel();
    }
    return Status::OK();
}

void FragmentExecState::callback(const Status& status, RuntimeProfile* profile, bool done) {}

std::string FragmentExecState::to_http_path(const std::string& file_name) {
    std::stringstream url;
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
        << "/api/_download_load?"
        << "token=" << _exec_env->token() << "&file=" << file_name;
    return url.str();
}

// There can only be one of these callbacks in-flight at any moment, because
// it is only invoked from the executor's reporting thread.
// Also, the reported status will always reflect the most recent execution status,
// including the final status when execution finishes.
void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile,
                                             bool done) {
    DCHECK(status.ok() || done); // if !status.ok() => done
    Status exec_status = update_status(status);

    Status coord_status;
    FrontendServiceConnection coord(_exec_env->frontend_client_cache(), _coord_addr, &coord_status);
    if (!coord_status.ok()) {
        std::stringstream ss;
        ss << "couldn't get a client for " << _coord_addr;
        update_status(Status::InternalError(ss.str()));
        return;
    }

    TReportExecStatusParams params;
    params.protocol_version = FrontendServiceVersion::V1;
    params.__set_query_id(_query_id);
    params.__set_backend_num(_backend_num);
    params.__set_fragment_instance_id(_fragment_instance_id);
    exec_status.set_t_status(&params);
    params.__set_done(done);

    RuntimeState* runtime_state = _executor.runtime_state();
    DCHECK(runtime_state != NULL);
    if (runtime_state->query_options().query_type == TQueryType::LOAD && !done && status.ok()) {
        // this is a load plan, and load is not finished, just make a brief report
        params.__set_loaded_rows(runtime_state->num_rows_load_total());
    } else {
        if (runtime_state->query_options().query_type == TQueryType::LOAD) {
            params.__set_loaded_rows(runtime_state->num_rows_load_total());
        }
        if (profile == nullptr) {
            params.__isset.profile = false;
        } else {
            profile->to_thrift(&params.profile);
            params.__isset.profile = true;
        }

        if (!runtime_state->output_files().empty()) {
            params.__isset.delta_urls = true;
            for (auto& it : runtime_state->output_files()) {
                params.delta_urls.push_back(to_http_path(it));
            }
        }
        if (runtime_state->num_rows_load_total() > 0 ||
            runtime_state->num_rows_load_filtered() > 0) {
            params.__isset.load_counters = true;
            // TODO(zc)
            static std::string s_dpp_normal_all = "dpp.norm.ALL";
            static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
            static std::string s_unselected_rows = "unselected.rows";

            params.load_counters.emplace(s_dpp_normal_all,
                                         std::to_string(runtime_state->num_rows_load_success()));
            params.load_counters.emplace(s_dpp_abnormal_all,
                                         std::to_string(runtime_state->num_rows_load_filtered()));
            params.load_counters.emplace(s_unselected_rows,
                                         std::to_string(runtime_state->num_rows_load_unselected()));
        }
        if (!runtime_state->get_error_log_file_path().empty()) {
            params.__set_tracking_url(
                    to_load_error_http_path(runtime_state->get_error_log_file_path()));
        }
        if (!runtime_state->export_output_files().empty()) {
            params.__isset.export_files = true;
            params.export_files = runtime_state->export_output_files();
        }
        if (!runtime_state->tablet_commit_infos().empty()) {
            params.__isset.commitInfos = true;
            params.commitInfos.reserve(runtime_state->tablet_commit_infos().size());
            for (auto& info : runtime_state->tablet_commit_infos()) {
                params.commitInfos.push_back(info);
            }
        }

        // Send new errors to coordinator
        runtime_state->get_unreported_errors(&(params.error_log));
        params.__isset.error_log = (params.error_log.size() > 0);
    }

    if (_exec_env->master_info()->__isset.backend_id) {
        params.__set_backend_id(_exec_env->master_info()->backend_id);
    }

    TReportExecStatusResult res;
    Status rpc_status;

    VLOG_ROW << "debug: reportExecStatus params is "
             << apache::thrift::ThriftDebugString(params).c_str();
    try {
        try {
            coord->reportExecStatus(res, params);
        } catch (TTransportException& e) {
            LOG(WARNING) << "Retrying ReportExecStatus: " << e.what();
            rpc_status = coord.reopen();

            if (!rpc_status.ok()) {
                // we need to cancel the execution of this fragment
                update_status(rpc_status);
                _executor.cancel();
                return;
            }
            coord->reportExecStatus(res, params);
        }

        rpc_status = Status(res.status);
    } catch (TException& e) {
        std::stringstream msg;
        msg << "ReportExecStatus() to " << _coord_addr << " failed:\n" << e.what();
        LOG(WARNING) << msg.str();
        rpc_status = Status::InternalError(msg.str());
    }

    if (!rpc_status.ok()) {
        // we need to cancel the execution of this fragment
        update_status(rpc_status);
        _executor.cancel();
    }
}

FragmentMgr::FragmentMgr(ExecEnv* exec_env)
        : _exec_env(exec_env),
          _fragment_map(),
          _fragments_ctx_map(),
          _stop_background_threads_latch(1) {
    _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
    INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
    REGISTER_HOOK_METRIC(plan_fragment_count, [this]() {
        std::lock_guard<std::mutex> lock(_lock);
        return _fragment_map.size();
    });

    auto s = Thread::create(
            "FragmentMgr", "cancel_timeout_plan_fragment", [this]() { this->cancel_worker(); },
            &_cancel_thread);
    CHECK(s.ok()) << s.to_string();

    // TODO(zc): we need a better thread-pool
    // now one user can use all the thread pool, others have no resource.
    s = ThreadPoolBuilder("FragmentMgrThreadPool")
                .set_min_threads(config::fragment_pool_thread_num_min)
                .set_max_threads(config::fragment_pool_thread_num_max)
                .set_max_queue_size(config::fragment_pool_queue_size)
                .build(&_thread_pool);
    CHECK(s.ok()) << s.to_string();
}

FragmentMgr::~FragmentMgr() {
    DEREGISTER_HOOK_METRIC(plan_fragment_count);
    _stop_background_threads_latch.count_down();
    if (_cancel_thread) {
        _cancel_thread->join();
    }
    // Stop all the worker, should wait for a while?
    // _thread_pool->wait_for();
    _thread_pool->shutdown();

    // Only me can delete
    {
        std::lock_guard<std::mutex> lock(_lock);
        _fragment_map.clear();
        _fragments_ctx_map.clear();
    }
}

static void empty_function(PlanFragmentExecutor* exec) {}

void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb) {
    exec_state->execute();

    std::shared_ptr<QueryFragmentsCtx> fragments_ctx = exec_state->get_fragments_ctx();
    bool all_done = false;
    if (fragments_ctx != nullptr) {
        // decrease the number of unfinished fragments
        all_done = fragments_ctx->countdown();
    }

    // remove exec state after this fragment finished
    {
        std::lock_guard<std::mutex> lock(_lock);
        _fragment_map.erase(exec_state->fragment_instance_id());
        if (all_done && fragments_ctx) {
            _fragments_ctx_map.erase(fragments_ctx->query_id);
        }
    }

    // Callback after remove from this id
    cb(exec_state->executor());
}

Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
    if (params.txn_conf.need_txn) {
        StreamLoadContext* stream_load_cxt = new StreamLoadContext(_exec_env);
        stream_load_cxt->db = params.txn_conf.db;
        stream_load_cxt->db_id = params.txn_conf.db_id;
        stream_load_cxt->table = params.txn_conf.tbl;
        stream_load_cxt->txn_id = params.txn_conf.txn_id;
        stream_load_cxt->id = UniqueId(params.params.query_id);
        stream_load_cxt->put_result.params = params;
        stream_load_cxt->use_streaming = true;
        stream_load_cxt->load_type = TLoadType::MANUL_LOAD;
        stream_load_cxt->load_src_type = TLoadSourceType::RAW;
        stream_load_cxt->label = params.import_label;
        stream_load_cxt->format = TFileFormatType::FORMAT_CSV_PLAIN;
        stream_load_cxt->timeout_second = 3600;
        stream_load_cxt->auth.auth_code_uuid = params.txn_conf.auth_code_uuid;
        stream_load_cxt->need_commit_self = true;
        stream_load_cxt->need_rollback = true;
        // total_length == -1 means read one message from pipe in once time, don't care the length.
        auto pipe = std::make_shared<StreamLoadPipe>(1024 * 1024 /* max_buffered_bytes */,
                                                     64 * 1024 /* min_chunk_size */,
                                                     -1 /* total_length */, true /* use_proto */);
        stream_load_cxt->body_sink = pipe;
        stream_load_cxt->max_filter_ratio = params.txn_conf.max_filter_ratio;

        RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_cxt->id, pipe));

        RETURN_IF_ERROR(
                _exec_env->stream_load_executor()->execute_plan_fragment(stream_load_cxt, pipe));
        set_pipe(params.params.fragment_instance_id, pipe);
        return Status::OK();
    } else {
        return exec_plan_fragment(params, std::bind<void>(&empty_function, std::placeholders::_1));
    }
}

void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id,
                           std::shared_ptr<StreamLoadPipe> pipe) {
    {
        std::lock_guard<std::mutex> lock(_lock);
        auto iter = _fragment_map.find(fragment_instance_id);
        if (iter != _fragment_map.end()) {
            _fragment_map[fragment_instance_id]->set_pipe(pipe);
        }
    }
}

std::shared_ptr<StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId& fragment_instance_id) {
    {
        std::lock_guard<std::mutex> lock(_lock);
        auto iter = _fragment_map.find(fragment_instance_id);
        if (iter != _fragment_map.end()) {
            return _fragment_map[fragment_instance_id]->get_pipe();
        } else {
            return nullptr;
        }
    }
}

Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb) {
    const TUniqueId& fragment_instance_id = params.params.fragment_instance_id;
    {
        std::lock_guard<std::mutex> lock(_lock);
        auto iter = _fragment_map.find(fragment_instance_id);
        if (iter != _fragment_map.end()) {
            // Duplicated
            return Status::OK();
        }
    }

    std::shared_ptr<FragmentExecState> exec_state;
    if (!params.__isset.is_simplified_param) {
        // This is an old version params, all @Common components is set in TExecPlanFragmentParams.
        exec_state.reset(new FragmentExecState(params.params.query_id,
                                               params.params.fragment_instance_id,
                                               params.backend_num, _exec_env, params.coord));
    } else {
        std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
        if (params.is_simplified_param) {
            // Get common components from _fragments_ctx_map
            std::lock_guard<std::mutex> lock(_lock);
            auto search = _fragments_ctx_map.find(params.params.query_id);
            if (search == _fragments_ctx_map.end()) {
                return Status::InternalError(
                        strings::Substitute("Failed to get query fragments context. Query may be "
                                            "timeout or be cancelled. host: ",
                                            BackendOptions::get_localhost()));
            }
            fragments_ctx = search->second;
        } else {
            // This may be a first fragment request of the query.
            // Create the query fragments context.
            fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env));
            fragments_ctx->query_id = params.params.query_id;
            RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl,
                                                  &(fragments_ctx->desc_tbl)));
            fragments_ctx->coord_addr = params.coord;
            fragments_ctx->query_globals = params.query_globals;

            if (params.__isset.resource_info) {
                fragments_ctx->user = params.resource_info.user;
                fragments_ctx->group = params.resource_info.group;
                fragments_ctx->set_rsc_info = true;
            }

            if (params.__isset.query_options) {
                fragments_ctx->timeout_second = params.query_options.query_timeout;
                if (params.query_options.__isset.resource_limit) {
                    fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
                }
            }

            {
                // Find _fragments_ctx_map again, in case some other request has already
                // create the query fragments context.
                std::lock_guard<std::mutex> lock(_lock);
                auto search = _fragments_ctx_map.find(params.params.query_id);
                if (search == _fragments_ctx_map.end()) {
                    _fragments_ctx_map.insert(
                            std::make_pair(fragments_ctx->query_id, fragments_ctx));
                } else {
                    // Already has a query fragmentscontext, use it
                    fragments_ctx = search->second;
                }
            }
        }

        exec_state.reset(new FragmentExecState(fragments_ctx->query_id,
                                               params.params.fragment_instance_id,
                                               params.backend_num, _exec_env, fragments_ctx));
    }

    std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
    _runtimefilter_controller.add_entity(params, &handler);
    exec_state->set_merge_controller_handler(handler);

    RETURN_IF_ERROR(exec_state->prepare(params));
    {
        std::lock_guard<std::mutex> lock(_lock);
        _fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state));
    }

    auto st = _thread_pool->submit_func(
            std::bind<void>(&FragmentMgr::_exec_actual, this, exec_state, cb));
    if (!st.ok()) {
        {
            // Remove the exec state added
            std::lock_guard<std::mutex> lock(_lock);
            _fragment_map.erase(params.params.fragment_instance_id);
        }
        exec_state->cancel_before_execute();
        return Status::InternalError(strings::Substitute(
                "Put planfragment to thread pool failed. err = $0", st.get_error_msg()));
    }

    return Status::OK();
}

Status FragmentMgr::cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason) {
    std::shared_ptr<FragmentExecState> exec_state;
    {
        std::lock_guard<std::mutex> lock(_lock);
        auto iter = _fragment_map.find(fragment_id);
        if (iter == _fragment_map.end()) {
            // No match
            return Status::OK();
        }
        exec_state = iter->second;
    }
    exec_state->cancel(reason);

    return Status::OK();
}

void FragmentMgr::cancel_worker() {
    LOG(INFO) << "FragmentMgr cancel worker start working.";
    do {
        std::vector<TUniqueId> to_cancel;
        DateTimeValue now = DateTimeValue::local_time();
        {
            std::lock_guard<std::mutex> lock(_lock);
            for (auto& it : _fragment_map) {
                if (it.second->is_timeout(now)) {
                    to_cancel.push_back(it.second->fragment_instance_id());
                }
            }
            for (auto it = _fragments_ctx_map.begin(); it != _fragments_ctx_map.end();) {
                if (it->second->is_timeout(now)) {
                    it = _fragments_ctx_map.erase(it);
                } else {
                    ++it;
                }
            }
        }
        timeout_canceled_fragment_count->increment(to_cancel.size());
        for (auto& id : to_cancel) {
            cancel(id, PPlanFragmentCancelReason::TIMEOUT);
            LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout fragment "
                      << print_id(id);
        }
    } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(1)));
    LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
}

void FragmentMgr::debug(std::stringstream& ss) {
    // Keep things simple
    std::lock_guard<std::mutex> lock(_lock);

    ss << "FragmentMgr have " << _fragment_map.size() << " jobs.\n";
    ss << "job_id\t\tstart_time\t\texecute_time(s)\n";
    DateTimeValue now = DateTimeValue::local_time();
    for (auto& it : _fragment_map) {
        ss << it.first << "\t" << it.second->start_time().debug_string() << "\t"
           << now.second_diff(it.second->start_time()) << "\n";
    }
}

/*
 * 1. resolve opaqued_query_plan to thrift structure
 * 2. build TExecPlanFragmentParams
 */
Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
                                                const TUniqueId& fragment_instance_id,
                                                std::vector<TScanColumnDesc>* selected_columns) {
    const std::string& opaqued_query_plan = params.opaqued_query_plan;
    std::string query_plan_info;
    // base64 decode query plan
    if (!base64_decode(opaqued_query_plan, &query_plan_info)) {
        LOG(WARNING) << "open context error: base64_decode decode opaqued_query_plan failure";
        std::stringstream msg;
        msg << "query_plan_info: " << query_plan_info
            << " validate error, should not be modified after returned Doris FE processed";
        return Status::InvalidArgument(msg.str());
    }
    TQueryPlanInfo t_query_plan_info;
    const uint8_t* buf = (const uint8_t*)query_plan_info.data();
    uint32_t len = query_plan_info.size();
    // deserialize TQueryPlanInfo
    auto st = deserialize_thrift_msg(buf, &len, false, &t_query_plan_info);
    if (!st.ok()) {
        LOG(WARNING) << "open context error: deserialize TQueryPlanInfo failure";
        std::stringstream msg;
        msg << "query_plan_info: " << query_plan_info
            << " deserialize error, should not be modified after returned Doris FE processed";
        return Status::InvalidArgument(msg.str());
    }

    // set up desc tbl
    DescriptorTbl* desc_tbl = NULL;
    ObjectPool obj_pool;
    st = DescriptorTbl::create(&obj_pool, t_query_plan_info.desc_tbl, &desc_tbl);
    if (!st.ok()) {
        LOG(WARNING) << "open context error: extract DescriptorTbl failure";
        std::stringstream msg;
        msg << "query_plan_info: " << query_plan_info
            << " create DescriptorTbl error, should not be modified after returned Doris FE "
               "processed";
        return Status::InvalidArgument(msg.str());
    }
    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
    if (tuple_desc == nullptr) {
        LOG(WARNING) << "open context error: extract TupleDescriptor failure";
        std::stringstream msg;
        msg << "query_plan_info: " << query_plan_info
            << " get  TupleDescriptor error, should not be modified after returned Doris FE "
               "processed";
        return Status::InvalidArgument(msg.str());
    }
    // process selected columns form slots
    for (const SlotDescriptor* slot : tuple_desc->slots()) {
        TScanColumnDesc col;
        col.__set_name(slot->col_name());
        col.__set_type(to_thrift(slot->type().type));
        selected_columns->emplace_back(std::move(col));
    }

    LOG(INFO) << "BackendService execute open()  TQueryPlanInfo: "
              << apache::thrift::ThriftDebugString(t_query_plan_info);
    // assign the param used to execute PlanFragment
    TExecPlanFragmentParams exec_fragment_params;
    exec_fragment_params.protocol_version = (PaloInternalServiceVersion::type)0;
    exec_fragment_params.__set_fragment(t_query_plan_info.plan_fragment);
    exec_fragment_params.__set_desc_tbl(t_query_plan_info.desc_tbl);

    // assign the param used for executing of PlanFragment-self
    TPlanFragmentExecParams fragment_exec_params;
    fragment_exec_params.query_id = t_query_plan_info.query_id;
    fragment_exec_params.fragment_instance_id = fragment_instance_id;
    std::map<::doris::TPlanNodeId, std::vector<TScanRangeParams>> per_node_scan_ranges;
    std::vector<TScanRangeParams> scan_ranges;
    std::vector<int64_t> tablet_ids = params.tablet_ids;
    TNetworkAddress address;
    address.hostname = BackendOptions::get_localhost();
    address.port = doris::config::be_port;
    std::map<int64_t, TTabletVersionInfo> tablet_info = t_query_plan_info.tablet_info;
    for (auto tablet_id : params.tablet_ids) {
        TPaloScanRange scan_range;
        scan_range.db_name = params.database;
        scan_range.table_name = params.table;
        auto iter = tablet_info.find(tablet_id);
        if (iter != tablet_info.end()) {
            TTabletVersionInfo info = iter->second;
            scan_range.tablet_id = tablet_id;
            scan_range.version = std::to_string(info.version);
            scan_range.version_hash = std::to_string(info.version_hash);
            scan_range.schema_hash = std::to_string(info.schema_hash);
            scan_range.hosts.push_back(address);
        } else {
            std::stringstream msg;
            msg << "tablet_id: " << tablet_id << " not found";
            LOG(WARNING) << "tablet_id [ " << tablet_id << " ] not found";
            return Status::NotFound(msg.str());
        }
        TScanRange doris_scan_range;
        doris_scan_range.__set_palo_scan_range(scan_range);
        TScanRangeParams scan_range_params;
        scan_range_params.scan_range = doris_scan_range;
        scan_ranges.push_back(scan_range_params);
    }
    per_node_scan_ranges.insert(std::make_pair((::doris::TPlanNodeId)0, scan_ranges));
    fragment_exec_params.per_node_scan_ranges = per_node_scan_ranges;
    exec_fragment_params.__set_params(fragment_exec_params);
    // batch_size for one RowBatch
    TQueryOptions query_options;
    query_options.batch_size = params.batch_size;
    query_options.query_timeout = params.query_timeout;
    query_options.mem_limit = params.mem_limit;
    query_options.query_type = TQueryType::EXTERNAL;
    exec_fragment_params.__set_query_options(query_options);
    VLOG_ROW << "external exec_plan_fragment params is "
             << apache::thrift::ThriftDebugString(exec_fragment_params).c_str();
    return exec_plan_fragment(exec_fragment_params);
}

Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, const char* data) {
    UniqueId fragment_instance_id = request->fragment_id();
    TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
    std::shared_ptr<FragmentExecState> fragment_state;

    {
        std::lock_guard<std::mutex> lock(_lock);
        auto iter = _fragment_map.find(tfragment_instance_id);
        if (iter == _fragment_map.end()) {
            LOG(WARNING) << "unknown.... fragment-id:" << fragment_instance_id;
            return Status::InvalidArgument("fragment-id");
        }
        fragment_state = iter->second;
    }
    DCHECK(fragment_state != nullptr);
    RuntimeFilterMgr* runtime_filter_mgr =
            fragment_state->executor()->runtime_state()->runtime_filter_mgr();

    Status st = runtime_filter_mgr->update_filter(request, data);
    return st;
}

Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char* attach_data) {
    UniqueId queryid = request->query_id();
    std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
    RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
    RETURN_IF_ERROR(filter_controller->merge(request, attach_data));
    return Status::OK();
}

} // namespace doris
